//修改这里的代码写的http://www.importnew.com/10712.html， 仅方便自己查看

/**实现一个磁盘的,记录暂存在磁盘**/
public class DiskTempStorage extends AbstractTempStorage 
{
    private static final Logger logger=LoggerFactory.getLogger(DiskTempStorage.class);
    private static final String DIR_NAME="tempStorage";
    public static final String FILE_SUFFIX=".rmsg";
     
    /**暂存记录文件的路径*/
    private static String path=Thread.currentThread().getContextClassLoader().getResource("").getPath()+DIR_NAME;;
     
    /**历史暂存记录的文件集合*/
    private List<File> history;
    /**暂存记录Reader*/
    private DiskTempStorageIterator iterator;
     
    /**多线程写记录*/
    private Map<Thread,DiskWriter> diskWriterMap=new ConcurrentHashMap<Thread, DiskWriter>();
     
    /**记录暂存消息个数*/
    private AtomicLong count=new AtomicLong();
     
    @Override
    public void add(RabbitMessage msg) 
    {
        Thread thread=Thread.currentThread();
        DiskWriter diskWriter=diskWriterMap.get(thread);
        if(diskWriter==null)
        {
            diskWriter=new DiskWriter(path);
            diskWriterMap.put(thread, diskWriter);
        }
        if(!diskWriter.write(msg))
        {
            logger.error("发生严重错误，请查看相关应用");
            if(listener!=null)
                listener.onError("发生严重错误，请查看相关应用");
        }
        if(listener!=null)
            listener.onIncreaseMsg(count.incrementAndGet());
    }
     
    public void init()
    {
        File pathFile=new File(path);
        if(!pathFile.exists())
            pathFile.mkdir();
         
        //加载历史暂存记录的文件集合
        File[] files=pathFile.listFiles(new FilenameFilter() {
            @Override
            public boolean accept(File dir, String name) {
                return name.endsWith(FILE_SUFFIX);
            }
        });
        if(files==null)
            history=Collections.EMPTY_LIST;
        else
            history=Arrays.asList(files);
         
        //初始化iterator
        iterator=new DiskTempStorageIterator(history.iterator());
    }
    public DiskTempStorage(){}
    private DiskTempStorage(DiskTempStorageIterator iterator)
    {
        this.iterator=iterator;
    }
     
    @Override
    public TempStorage newTempStorage()
    {
        TempStorage tempStorage=new DiskTempStorage(new DiskTempStorageIterator(null));
        tempStorage.setListener(listener);
        //更换暂存器，要把DiskReader的数据来源换成workSpace
        history=new ArrayList<File>();
        for(DiskWriter writer:diskWriterMap.values())
        {
            history.addAll(writer.getFiles());
            writer.stop();
        }
        iterator.setFiles(history.iterator());
        logger.info("#########################切换暂存器##############################");
        return tempStorage;
    }
     
    @Override
    public boolean hasMsg()
    {
        return count.get()>0||(history!=null&&history.size()>0);      
    }
 
    @Override
    public long size()
    {
        return count.get();
    }
     
    @Override
    public void clear() 
    {
        //清空diskReader关联的文件集合
        diskWriterMap.clear();
        if(history==null)
            return;
        boolean res;
        for(File file:history)
        {
            do{
                System.gc();
                res=file.delete();
                logger.info("#############################删除文件："+res+"======"+file.getName());
            }while(!res);
        }
        history=null;
        logger.info("################################清理了"+iterator.getCount()+"个消息");
    }
 
    @Override
    public Iterator<RabbitMessage> iterator() 
    {
        return iterator;
    }
     
}

public class DiskTempStorageIterator implements Iterator<RabbitMessage>
{
    private static final Logger logger=LoggerFactory.getLogger(DiskTempStorageIterator.class);
    private static final int CHUNK_SIZE = 4096;
    private Iterator<File> files;
     
    private RabbitMessage rmsg;
    public long getCount() {
        return count;
    }
 
    private long position=0;
    private FileChannel channel;
    private MappedByteBuffer buffer;
    private RandomAccessFile randomAccessFile;
    private long count=0;
 
    @Override
    public boolean hasNext() 
    {
        RabbitMsgDecoder decoder;
        int size;
        do{
            buffer = getBuffer(RabbitMsgDecoder.HEAD_SIZE);
            if(buffer==null)
                return false;
             
            decoder=new RabbitMsgDecoder(buffer);
            position+=RabbitMsgDecoder.HEAD_SIZE;
            size=decoder.size();
            if(size==0)
            {
                buffer=nextFileBuffer(size);
                continue;
            }
            buffer = getBuffer(size);
            rmsg=decoder.decode(buffer);
            count++;
            position+=size;
            if(rmsg!=null)
                return true;
        }while(true);   
    }
     
    private MappedByteBuffer nextFileBuffer(long length)
    {
        if (channel != null) 
        {
            try {
                buffer=null;
                channel.close();
                randomAccessFile.close();
            } catch (IOException e) {
                logger.error("IOException",e);
            }
            channel = null;
        }
        if (files.hasNext()) 
        {
            try{
                File file = files.next();
                randomAccessFile=new RandomAccessFile(file, "r");
                channel = randomAccessFile.getChannel();
                position = 0;
                long fileSize=channel.size();
                fileSize=fileSize<length?fileSize:length;
                return buffer=channel.map(MapMode.READ_ONLY, position, fileSize>CHUNK_SIZE?fileSize:CHUNK_SIZE);
            }catch (IOException e) {
                logger.error("IOException",e);
            }
        } 
        return buffer=null;
    }
     
    private MappedByteBuffer getBuffer(long length) 
    {
        if (!(buffer == null ||buffer.remaining()<=length||!buffer.hasRemaining()))
            return buffer;
 
        try{
            if (channel == null || channel.size() == position) 
            {
                if (channel != null) 
                {
                    try {
                        buffer=null;
                        channel.close();
                        randomAccessFile.close();
                    } catch (IOException e) {
                        logger.error("IOException",e);
                    }
                    channel = null;
                }
                if (files.hasNext()) 
                {
                    File file = files.next();
                    randomAccessFile=new RandomAccessFile(file, "r");
                    channel = randomAccessFile.getChannel();
                    position = 0;
                } else
                    return buffer=null;
            }
            long fileSize=channel.size(),remain=fileSize-position;
            if (remain<length) {
                length =(int)(fileSize-position);
                return nextFileBuffer(length);
            }
            else
                return buffer=channel.map(MapMode.READ_ONLY, position, length);
        }catch (IOException e) {
            logger.error("IOException",e);
            return null;
        }
    }
 
    @Override
    public RabbitMessage next() 
    {
        RabbitMessage res=rmsg;
        rmsg=null;
        return res;
    }
     
    public DiskTempStorageIterator(Iterator<File> files)
    {
        count=0;
        this.files=files;
    }
     
    public Iterator<File> getFiles() {
        return files;
    }
 
    public void setFiles(Iterator<File> files) {
        count=0;
        this.files = files;
    }
 
    @Override
    public void remove() 
    {
        throw new UnsupportedOperationException();
    }
 
}


public class DiskWriter 
{
    private static final Logger logger=LoggerFactory.getLogger(DiskWriter.class);
    private static final int CHUNK_SIZE = 4096;
    private static final long MAX_FILE_LENGTH=200000000l;
     
    private String path;
    private File file;
    private List<File> files=new ArrayList<File>();
     
    private long position=0;
    private FileChannel channel;
    private MappedByteBuffer buffer;
    private RandomAccessFile randomAccessFile;
     
    public DiskWriter(String path)
    {
        this.path=path;
        changeOtherFile();
    }
     
    public boolean write(RabbitMessage msg)
    {
        RabbitMsgEncoder encoder=new RabbitMsgEncoder(msg);
        int length=encoder.size();
        buffer = getBuffer(length);
        if(buffer==null)
            return false;
         
        encoder.encode(buffer);
        position+=length;
        if(position>MAX_FILE_LENGTH)
            changeOtherFile();
        return true;
    }
     
    /**获得max{length,N}大小的内存映射*/
    private MappedByteBuffer getBuffer(long length) 
    {
        length=length>CHUNK_SIZE?length:CHUNK_SIZE;
         
        try {
            if (buffer == null ) 
                buffer=channel.map(MapMode.READ_WRITE, position, length);
            else if(buffer.remaining()<length||!buffer.hasRemaining())
            {
                buffer.force();
                buffer=channel.map(MapMode.READ_WRITE, position, length);
            }
        } catch (IOException e) {
            logger.error("IOException",e);
            return null;
        }
        return buffer;
    }
     
    private void changeOtherFile()
    {
        stop();
        file=generateFile();
        try {
            randomAccessFile=new RandomAccessFile(file, "rw");
            channel=randomAccessFile.getChannel();
        } catch (FileNotFoundException e) {
            logger.error("不可能出错",e);
        }
        files.add(file);
    }
     
    private File generateFile()
    {
        String fileName=path+File.separator+System.currentTimeMillis()+"-"+Thread.currentThread().getId()+DiskTempStorage.FILE_SUFFIX;
        File file=new File(fileName);
        try {
            file.createNewFile();
            logger.debug("创建文件："+fileName);
        } catch (IOException e) {
            logger.error("生成记录文件失败"+fileName,e);
        }
        return file;
    }
     
    public void stop()
    {
        position=0;
        buffer=null;
        try {
            if(channel!=null)
                channel.close();
            if(randomAccessFile!=null)
                randomAccessFile.close();
        } catch (IOException e1) {
            logger.error(file+"文件关闭出错",e1);
        }
    }
 
    public List<File> getFiles() {
        return files;
    }
}

